/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
	"context"
	"fmt"
	rocketmq "gitee.com/zackeus/go-boot/rocketmq"
	"gitee.com/zackeus/go-boot/rocketmq/consumer"
	"gitee.com/zackeus/go-boot/rocketmq/primitive"
	"gitee.com/zackeus/go-boot/rocketmq/rlog"
	"gitee.com/zackeus/go-zero/core/logx"
	"gitee.com/zackeus/goutil"
	"os"
	"os/signal"
	"syscall"
	"time"
)

const (
	Topic         = "cti-retry-topic"
	ConsumerGroup = "cti-retry-group"
	Endpoint      = "rmq.nameserver1.server:9876"
	AccessKey     = "testuser"
	SecretKey     = "yulon123"
)

// 并发消费重试
// use concurrent consumer model, when Subscribe function return consumer.ConsumeRetryLater, the message will be
// send to RocketMQ retry topic. we could set DelayLevelWhenNextConsume in ConsumeConcurrentlyContext, which used to
// indicate the delay of message re-send to origin topic from retry topic.
//
// in this example, we always set DelayLevelWhenNextConsume=1, means that the message will be sent to origin topic after
// 1s. in case of the unlimited retry, we will return consumer.ConsumeSuccess after ReconsumeTimes > 5
func main() {
	if err := logx.SetUp(logx.LogConf{Mode: "console", Encoding: "plain"}); err != nil {
		fmt.Println(err)
		return
	}
	rlog.SetLevel("error")

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGTERM, syscall.SIGINT)
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName(ConsumerGroup),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{Endpoint})),
		consumer.WithConsumerModel(consumer.Clustering),
		consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
		/* 最大重试次数 超过次数会送入死信队列 */
		consumer.WithMaxReconsumeTimes(5),
		/* 鉴权消息 */
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: AccessKey,
			SecretKey: SecretKey,
		}),
	)

	// DelayLevel 延迟等级 指定下次重新消费之前的等待时间，现在范围是1到18
	// -1 不重试，直接发往死信队列
	// The time of each level is the value of indexing of {level-1} in [1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h]
	delayLevel := 3
	err := c.Subscribe(Topic, consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {

		concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
		/* only run when return consumer.ConsumeRetryLater */
		/* DelayLevelWhenNextConsume 指定重试间隔时间 不指定则间隔时间随重试次数增加而递增(默认初始间隔为 10s) */
		concurrentCtx.DelayLevelWhenNextConsume = delayLevel

		for _, msg := range msgs {
			fmt.Println("now: ", time.Now())
			if msg.ReconsumeTimes > 5 {
				fmt.Printf("msg ReconsumeTimes > 5. msg: %v", msg)
				//return consumer.ConsumeSuccess, nil
			} else {
				fmt.Printf("subscribe callback: %v \n", msg)
			}
		}
		return consumer.ConsumeRetryLater, nil
	})
	goutil.PanicErr(err)
	// Note: start after subscribe
	err = c.Start()
	goutil.PanicErr(err)

	<-signals
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("shundown Consumer error: %s", err.Error())
	}
}
